package com.xiaofan.apitest.source

import org.apache.flink.streaming.api.functions.source.{ParallelSourceFunction, SourceFunction}
import org.apache.flink.streaming.api.scala._

import java.util.Random

/**
 * 自定义 SourceFunction
 */
object CustomSourceTest {
  def main(args: Array[String]): Unit = {

    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    // env.setParallelism(1)

    val data: DataStream[SensorReading] = env.addSource(new MySensorSourceFunction).setParallelism(2)
    data.print()

    env.execute("custom source test")

  }
}

class MySensorSourceFunction extends SourceFunction[SensorReading] {

  // 定义一个标识为flag，用来表示数据源是否正常发送数据
  var running: Boolean = true

  override def run(ctx: SourceFunction.SourceContext[SensorReading]): Unit = {

    val rand = new Random()

    // 随机生成一组（10个）传感器的初始温度：(id, temperature)
    var curTemp = 1.to(10).map(i => ("sensor_" + i, rand.nextDouble() * 100))

    // 定义无限循环，不停的产生数据，除非被 cancel
    while (running) {
      // 在上次数据的基础上微调，更新温度值（高斯分布）
      curTemp = curTemp.map(
        data => (data._1, data._2 + rand.nextGaussian())
      )

      // 获取当前时间戳，加入到数据中
      val curTime: Long = System.currentTimeMillis()

      curTemp.foreach(
        data => ctx.collect(SensorReading(data._1, curTime, data._2))
      )

      // 间隔500ms
      Thread.sleep(500)
    }
  }

  override def cancel(): Unit = running = false
}














